package io.nutz.irtu.gpsserver;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.nutz.aop.interceptor.async.Async;
import org.nutz.dao.Chain;
import org.nutz.dao.Cnd;
import org.nutz.dao.Dao;
import org.nutz.ioc.impl.PropertiesProxy;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Lang;
import org.nutz.lang.util.NutMap;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.smartboot.socket.MessageProcessor;
import org.smartboot.socket.Protocol;
import org.smartboot.socket.StateMachineEnum;
import org.smartboot.socket.transport.AioQuickServer;
import org.smartboot.socket.transport.AioSession;

import io.nutz.irtu.gpsserver.bean.AbstractDevMsg;
import io.nutz.irtu.gpsserver.bean.DevInfo;
import io.nutz.irtu.gpsserver.bean.DevLocMsg;
import io.nutz.irtu.gpsserver.bean.DevStatusMsg;

@IocBean(create="init", depose="depose")
public class GpsServer implements MessageProcessor<byte[]>, Protocol<byte[]> {
    
    private static final Log log = Logs.get();

    @Inject
    protected PropertiesProxy conf;
    
    @Inject
    protected Dao dao;
    
    protected AioQuickServer<byte[]> srv;
    
    @Inject
    protected SimpleWsEndpoint endpoint;
    
    protected Map<String, WeakReference<AioSession<byte[]>>> sessions;
    
    public void init() throws IOException {
        sessions = new HashMap<>();
        srv = new AioQuickServer<>(conf.getInt("gps.port", 19002), this, this);
        srv.start();
    }
    
    public void depose() {
        if (srv != null)
            srv.shutdown();
    }
    
    public byte[] decode(ByteBuffer buffer, AioSession<byte[]> session) {
        try {
            ByteBuffer prev = session.getAttachment();
            byte[] tmp = new byte[buffer.remaining()];
            buffer.get(tmp);
            if (prev == null) {
                buffer = ByteBuffer.wrap(tmp);
            }
            else {
                buffer = ByteBuffer.allocate(prev.remaining() + tmp.length);
                buffer.position(0);
                byte[] tmp2 = new byte[prev.remaining()];
                prev.get(tmp2);
                buffer.put(tmp2);
                buffer.put(tmp);
                buffer.position(0);
            }
            OUT: while (buffer.hasRemaining()) {
                //System.out.println(buffer.position() + "," + buffer.limit());
                buffer.mark();
                byte firstByte = buffer.get();
                switch (firstByte) {
                case '{':{
                    int len = 1;
                    while (buffer.hasRemaining()) {
                        byte t = buffer.get();
                        len ++;
                        if (t == '}') {
                            byte[] buf = new byte[len];
                            buffer.reset();
                            buffer.get(buf, 0, buf.length);
                            buffer.mark();
                            onMsg(session, buf);
                            continue OUT;
                        }
                    }
                    // 半包数据
                    buffer.reset();
                    session.setAttachment(buffer.slice());
                    return null;
                }
                // 设备状态
                case 0x55: { // >b7IHb  ==  1*7+4+2+1 = 14
                    if (buffer.remaining() >= 13) {
                        byte[] buf = new byte[14];
                        buffer.reset();
                        buffer.get(buf);
                        buffer.mark();
                        onMsg(session, buf);
                        continue OUT;
                    }
                    // 半包数据
                    session.setAttachment(buffer.slice());
                    return null;
                }
                // 位置报告
                case (byte) 0xAA:{ // b2i3H2b3 == 1*2+4*3+2*2+1*3 == 2+12+4+3 = 21
                    if (buffer.remaining() >= 20) {
                        byte[] buf = new byte[21];
                        buffer.reset();
                        buffer.get(buf);
                        buffer.mark();
                        onMsg(session, buf);
                        continue OUT;
                    }
                    // 半包数据
                    session.setAttachment(buffer.flip());
                    return null;
                }
                default:
                    // nop
                    break;
                }
            }
        }
        catch (Exception e) {
            log.debug("出错了", e);
        }
        session.setAttachment(null);
        return null;
    }
    
    @Async
    public void onMsg(AioSession<byte[]> session, byte[] msg) {
        process(session, msg);
    }

    @Override
    public void process(AioSession<byte[]> session, byte[] msg) {
        try {
            switch (msg[0]) {
            case '{':{
                String str = new String(msg).trim();
                log.debugf("JSON信息=%s", str);
                NutMap m = Json.fromJson(NutMap.class, str);
                String imei = m.getString("imei");
                String iccid = m.getString("iccid");
                if (imei != null && imei.length() >= 15) {
                    session.getAttrs().putAll(m);
                    DevInfo dev = dao.fetch(DevInfo.class, imei);
                    if (dev == null) {
                        log.debugf("新增设备 imei=%s iccid=%s", imei, iccid);
                        try {
                            dev = new DevInfo();
                            dev.setImei(imei);
                            dev.setIccid(iccid);
                            dev.setTags("");
                            dev.setNickname(imei);
                            dao.insert(dev);
                        }
                        catch (Throwable e) {
                            log.warn("创建设备失败", e);
                        }
                    }
                    // 设备换卡
                    else if (iccid != null && !iccid.equals(dev.getIccid())) {
                        log.debugf("设备换卡了 imei=%s iccid=%s", imei, iccid);
                        dao.update(DevInfo.class, Chain.make("iccid", iccid), Cnd.where("imei", "=", imei));
                    }
                    // 记录连接的id
                    dao.update(DevInfo.class, Chain.make("sessionID", session.getSessionID()), Cnd.where("imei", "=", imei));
                }
                else if (m.containsKey("msg")) {
                    DevLocMsg devmsg = DevLocMsg.from(m.getList("msg", Object.class));
                    devmsg.setImei(session.getAttr("imei"));
                    record(devmsg);
                    //log.debug("location " + Json.toJson(loc));
                }
                else if (m.containsKey("sta")) {
                    DevStatusMsg devmsg = DevStatusMsg.from(m.getList("sta", Object.class));
                    devmsg.setImei(session.getAttr("imei"));
                    record(devmsg);
                    //log.debug("status " + Json.toJson(stat));
                }
                break;
            }
            // 设备状态
            case 0x55: { // >b7IHb  ==  1*7+4+2+1 = 14
                log.debug("设备状态" + Lang.fixedHexString(msg));
                //log.debug("status " + Json.toJson(DevStatusMsg.from(msg)));
                DevStatusMsg devmsg = DevStatusMsg.from(msg);
                devmsg.setImei(session.getAttr("imei"));
                record(devmsg);
                break;
            }
            // 位置报告
            case (byte) 0xAA:{ // b2i3H2b3 == 1*2+4*3+2*2+1*3 == 2+12+4+3 = 19
                log.debug("位置报告" + Lang.fixedHexString(msg));
                //log.debug("location " + Json.toJson(DevLocMsg.from(msg)));
                DevLocMsg devmsg = DevLocMsg.from(msg);
                devmsg.setImei(session.getAttr("imei"));
                record(devmsg);
                break;
            }
            }
        }
        catch (Exception e) {
            log.debug("出错了", e);
        }
    }
    
    @Async
    public void record(AbstractDevMsg msg) {
        if (msg instanceof DevLocMsg) {
            if (((DevLocMsg)msg).isFixed()) {
                dao.insert(msg);
            }
            toWsRoom(msg.getImei(), new NutMap("loc", msg));
        }
        else if (msg instanceof DevStatusMsg) {
            //dao.insert(msg);
            // TODO 怎么处理这个设备状态呢
            toWsRoom(msg.getImei(), new NutMap("sta", msg));
        }
    }
    
    protected void toWsRoom(String room, NutMap map) {
        endpoint.each(room, (index,ele,count)->{
            try {
                endpoint.sendJsonSync(ele.getId(), map);
            }
            catch (Exception e) {
            }
        });
    }

    @Override
    public void stateEvent(AioSession<byte[]> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
        switch (stateMachineEnum) {
        case NEW_SESSION:
            log.debug("新连接 session id=" + session.getSessionID());
            sessions.put(session.getSessionID(), new WeakReference<AioSession<byte[]>>(session));
            break;
        case SESSION_CLOSED:
            log.debug("连接关闭 session id=" + session.getSessionID());
            sessions.remove(session.getSessionID());
            break;

        default:
            break;
        }
    }
    
    public String sendMsg(String sessionID, byte[] data) {
        WeakReference<AioSession<byte[]>> ref = sessions.get(sessionID);
        if (ref == null) {
            log.debug("设备对应的session不存在");
            return "no_such_session";
        }
        AioSession<byte[]> session = ref.get();
        if (session.isInvalid()) {
            log.debug("设备的连接已中断");
            return "session_invalid";
        }
        try {
            session.writeBuffer().writeAndFlush(data);
        }
        catch (Exception e) {
            return "session_write_fail";
        }
        return null;
    }
}
